package xconcurrent

import (
	"context"
	"gitee.com/xfrm/middleware/xlog"
	"sync"
)

func PageExecute(ctx context.Context, concurrentCount int, pageSize int, input []interface{}, exec func(ctx2 context.Context, onePageInput []interface{}) ([]interface{}, error)) []interface{} {
	fun := "PageExecute -->"
	results := make([]interface{}, 0)
	var ch = make(chan []interface{})
	var resulCh = make(chan []interface{})
	go func() {
		var offset int
		for {
			end := offset + pageSize
			if end >= len(input) {
				ch <- input[offset:]
				break
			} else {
				ch <- input[offset:end]
				offset = end
			}
		}
		close(ch)
	}()
	wg := sync.WaitGroup{}
	for i := 0; i < concurrentCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for onePageInput := range ch {
				out, err := exec(ctx, onePageInput)
				if err != nil {
					xlog.Warnf(ctx, "%s execute error: input: %v, err: %v", fun, onePageInput, err)
				}
				if len(out) > 0 {
					resulCh <- out
				}
			}
		}()
	}
	go func() {
		wg.Wait()
		close(resulCh)
	}()
	for onePageResult := range resulCh {
		results = append(results, onePageResult...)
	}
	return results
}
func ProduceCosumeExecute(ctx context.Context, concurrentCount int, input []interface{}, exec func(ctx2 context.Context, onePageInput []interface{}) ([]interface{}, error)) []interface{} {
	return PageExecute(ctx, concurrentCount, 1, input, exec)
}
func ProduceCosumeExecuteV2(ctx context.Context, concurrentCount int, input []interface{}, exec func(ctx2 context.Context, one interface{}) ([]interface{}, error)) []interface{} {
	fun := "ProduceCosumeExecuteV2 -->"
	results := make([]interface{}, 0)
	var ch = make(chan interface{})
	var resulCh = make(chan []interface{})
	go func() {
		for _, one := range input {
			ch <- one
		}
		close(ch)
	}()
	wg := sync.WaitGroup{}
	for i := 0; i < concurrentCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for one := range ch {
				out, err := exec(ctx, one)
				if err != nil {
					xlog.Warnf(ctx, "%s execute error: input: %v, err: %v", fun, one, err)
				}
				if len(out) > 0 {
					resulCh <- out
				}
			}
		}()
	}
	go func() {
		wg.Wait()
		close(resulCh)
	}()
	for onePageResult := range resulCh {
		results = append(results, onePageResult...)
	}
	return results
}
